/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.extensions.sql.impl;



import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.annotations.Internal;

import com.bff.gaia.unified.sdk.extensions.sql.UnifiedSqlTable;

import com.bff.gaia.unified.sdk.extensions.sql.UnifiedSqlUdf;

import com.bff.gaia.unified.sdk.extensions.sql.impl.rel.UnifiedRelNode;

import com.bff.gaia.unified.sdk.extensions.sql.impl.udf.UnifiedBuiltinFunctionProvider;

import com.bff.gaia.unified.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;

import com.bff.gaia.unified.sdk.extensions.sql.meta.provider.TableProvider;

import com.bff.gaia.unified.sdk.extensions.sql.meta.provider.UdfUdafProvider;

import com.bff.gaia.unified.sdk.extensions.sql.meta.store.InMemoryMetaStore;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Strings;

import org.apache.calcite.jdbc.CalcitePrepare;

import org.apache.calcite.plan.RelOptUtil;

import org.apache.calcite.schema.Function;

import org.apache.calcite.sql.SqlExecutableStatement;

import com.bff.gaia.unified.sdk.transforms.Combine;



import java.lang.reflect.Method;

import java.sql.SQLException;

import java.util.AbstractMap.SimpleEntry;

import java.util.*;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;



/**

 * Contains the metadata of tables/UDF functions, and exposes APIs to

 * query/validate/optimize/translate SQL statements.

 */

@Internal

@Experimental

public class UnifiedSqlEnv {

  JdbcConnection connection;

  QueryPlanner planner;



  private UnifiedSqlEnv(JdbcConnection connection, QueryPlanner planner) {

    this.connection = connection;

    this.planner = planner;

  }



  /** Creates a builder with the default schema backed by the table provider. */

  public static UnifiedSqlEnvBuilder builder(TableProvider tableProvider) {

    return new UnifiedSqlEnvBuilder(tableProvider);

  }



  public static UnifiedSqlEnv readOnly(String tableType, Map<String, UnifiedSqlTable> tables) {

    return withTableProvider(new ReadOnlyTableProvider(tableType, tables));

  }



  public static UnifiedSqlEnv withTableProvider(TableProvider tableProvider) {

    return builder(tableProvider).build();

  }



  public static UnifiedSqlEnv inMemory(TableProvider... tableProviders) {

    InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();

    for (TableProvider tableProvider : tableProviders) {

      inMemoryMetaStore.registerProvider(tableProvider);

    }



    return withTableProvider(inMemoryMetaStore);

  }



  public UnifiedRelNode parseQuery(String query) throws ParseException {

    return planner.convertToUnifiedRel(query);

  }



  public boolean isDdl(String sqlStatement) throws ParseException {

    return planner.parse(sqlStatement) instanceof SqlExecutableStatement;

  }



  public void executeDdl(String sqlStatement) throws ParseException {

    SqlExecutableStatement ddl = (SqlExecutableStatement) planner.parse(sqlStatement);

    ddl.execute(getContext());

  }



  public CalcitePrepare.Context getContext() {

    return connection.createPrepareContext();

  }



  public Map<String, String> getPipelineOptions() {

    return connection.getPipelineOptionsMap();

  }



  public String explain(String sqlString) throws ParseException {

    try {

      return RelOptUtil.toString(planner.convertToUnifiedRel(sqlString));

    } catch (Exception e) {

      throw new ParseException("Unable to parse statement", e);

    }

  }



  /** UnifiedSqlEnv's Builder. */

  public static class UnifiedSqlEnvBuilder {

    private static final String CALCITE_PLANNER =

        "com.bff.gaia.unified.sdk.extensions.sql.impl.CalciteQueryPlanner";

    private String queryPlannerClassName;

    private TableProvider defaultTableProvider;

    private String currentSchemaName;

    private Map<String, TableProvider> schemaMap;

    private Set<Map.Entry<String, Function>> functionSet;

    private boolean autoLoadBuiltinFunctions;

    private boolean autoLoadUdfs;



    private UnifiedSqlEnvBuilder(TableProvider tableProvider) {

      checkNotNull(tableProvider, "Table provider for the default schema must be sets.");



      defaultTableProvider = tableProvider;

      queryPlannerClassName = CALCITE_PLANNER;

      schemaMap = new HashMap<>();

      functionSet = new HashSet<>();

      autoLoadUdfs = false;

      autoLoadBuiltinFunctions = false;

    }



    /** Add a top-level schema backed by the table provider. */

    public UnifiedSqlEnvBuilder addSchema(String name, TableProvider tableProvider) {

      if (schemaMap.containsKey(name)) {

        throw new RuntimeException("Schema " + name + " is registered twice.");

      }



      schemaMap.put(name, tableProvider);

      return this;

    }



    /** Set the current (default) schema. */

    public UnifiedSqlEnvBuilder setCurrentSchema(String name) {

      currentSchemaName = name;

      return this;

    }



    /** Register a UDF function which can be used in SQL expression. */

    public UnifiedSqlEnvBuilder addUdf(String functionName, Class<?> clazz, String method) {

      functionSet.add(new SimpleEntry<>(functionName, UdfImpl.create(clazz, method)));

      return this;

    }



    /** Register a UDF function which can be used in SQL expression. */

    public UnifiedSqlEnvBuilder addUdf(String functionName, Class<? extends UnifiedSqlUdf> clazz) {

      return addUdf(functionName, clazz, UnifiedSqlUdf.UDF_METHOD);

    }



    /** Register a UDF function which can be used in SQL expression. */

    public UnifiedSqlEnvBuilder addUdf(String functionName, SerializableFunction sfn) {

      return addUdf(functionName, sfn.getClass(), "apply");

    }



    /**

     * Register a UDAF function which can be used in GROUP-BY expression.

     *

     * <p>See {@link Combine.CombineFn} on how to implement a UDAF.

     */

    public UnifiedSqlEnvBuilder addUdaf(String functionName, Combine.CombineFn combineFn) {

      functionSet.add(new SimpleEntry<>(functionName, new UdafImpl(combineFn)));

      return this;

    }



    /** Load UDF/UDAFs from {@link UdfUdafProvider}. */

    public UnifiedSqlEnvBuilder autoLoadUserDefinedFunctions() {

      autoLoadUdfs = true;

      return this;

    }



    /** Load Unified SQL built-in functions defined in {@link UnifiedBuiltinFunctionProvider}. */

    public UnifiedSqlEnvBuilder autoLoadBuiltinFunctions() {

      autoLoadBuiltinFunctions = true;

      return this;

    }



    public UnifiedSqlEnvBuilder setQueryPlannerClassName(String name) {

      queryPlannerClassName = name;

      return this;

    }



    /**

     * Build function to create an instance of UnifiedSqlEnv based on preset fields.

     *

     * @return UnifiedSqlEnv.

     */

    public UnifiedSqlEnv build() {



      JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider);



      configureSchemas(jdbcConnection);



      loadUnifiedBuiltinFunctions();



      loadUdfs();



      addUdfsUdafs(jdbcConnection);



      QueryPlanner planner = instantiatePlanner(jdbcConnection);



      return new UnifiedSqlEnv(jdbcConnection, planner);

    }



    private void configureSchemas(JdbcConnection jdbcConnection) {

      // SetSchema adds the schema with the specified name

      // backed by the table provider.

      // Does not update the current default schema.

      schemaMap.forEach(jdbcConnection::setSchema);



      if (Strings.isNullOrEmpty(currentSchemaName)) {

        return;

      }



      try {

        jdbcConnection.setSchema(currentSchemaName);

      } catch (SQLException e) {

        throw new RuntimeException(e);

      }

    }



    private void loadUnifiedBuiltinFunctions() {

      if (!autoLoadBuiltinFunctions) {

        return;

      }



      for (UnifiedBuiltinFunctionProvider provider :

          ServiceLoader.load(UnifiedBuiltinFunctionProvider.class)) {

        loadBuiltinUdf(provider.getBuiltinMethods());

      }

    }



    private void loadBuiltinUdf(Map<String, List<Method>> methods) {

      for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {

        for (Method method : entry.getValue()) {

          functionSet.add(new SimpleEntry<>(entry.getKey(), UdfImpl.create(method)));

        }

      }

    }



    private void loadUdfs() {

      if (!autoLoadUdfs) {

        return;

      }



      ServiceLoader.load(UdfUdafProvider.class)

          .forEach(

              ins -> {

                ins.getUnifiedSqlUdfs().forEach(this::addUdf);

                ins.getSerializableFunctionUdfs().forEach(this::addUdf);

                ins.getUdafs().forEach(this::addUdaf);

              });

    }



    private void addUdfsUdafs(JdbcConnection connection) {

      for (Map.Entry<String, Function> functionEntry : functionSet) {

        connection.getCurrentSchemaPlus().add(functionEntry.getKey(), functionEntry.getValue());

      }

    }



    private QueryPlanner instantiatePlanner(JdbcConnection jdbcConnection) {



      if (queryPlannerClassName.equals(CALCITE_PLANNER)) {

        return new CalciteQueryPlanner(jdbcConnection);

      }



      try {

        return (QueryPlanner)

            Class.forName(queryPlannerClassName)

                .getConstructor(JdbcConnection.class)

                .newInstance(jdbcConnection);

      } catch (Exception e) {

        throw new RuntimeException(

            String.format("Cannot construct query planner %s", queryPlannerClassName), e);

      }

    }

  }

}